RabbitMq的ack用法 您所在的位置:网站首页 matlab clear用法 RabbitMq的ack用法

RabbitMq的ack用法

#RabbitMq的ack用法| 来源: 网络整理| 查看: 265

仔細檢視一下 Consumer 的回撥方法:

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ...... consumerChannel1.basicAck(envelope.getDeliveryTag(), false); }

當我們需要確認一條訊息已經被消費時,我們呼叫的 basicAck 方法的第一個引數是 Delivery Tag。

Delivery Tag 用來標識通道中投遞的訊息。RabbitMQ 推送訊息給 Consumer 時,會附帶一個 Delivery Tag,以便 Consumer 可以在訊息確認時告訴 RabbitMQ 到底是哪條訊息被確認了。

RabbitMQ 保證在每個通道中,每條訊息的 Delivery Tag 從 1 開始遞增。

執行下面的例子可以直觀的看到這點:

gordon.study.rabbitmq.ack.TestAckBasic.java

public class TestAckBasic { private static final String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); final Channel consumerChannel1 = connection.createChannel(); consumerChannel1.queueDeclare(QUEUE_NAME, false, false, false, null); consumerChannel1.basicQos(3); Consumer consumer1 = new DefaultConsumer(consumerChannel1) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.printf("in consumer A (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { } consumerChannel1.basicAck(envelope.getDeliveryTag(), false); } }; consumerChannel1.basicConsume(QUEUE_NAME, false, consumer1); final Channel consumerChannel2 = connection.createChannel(); consumerChannel2.basicQos(3); Consumer consumer2 = new DefaultConsumer(consumerChannel2) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.printf("in consumer B (delivery tag is %d): %s\n", envelope.getDeliveryTag(), message); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { } consumerChannel2.basicAck(envelope.getDeliveryTag(), false); } }; consumerChannel2.basicConsume(QUEUE_NAME, false, consumer2); Channel senderChannel = connection.createChannel(); for (int i = 0; i < 10;) { String message = "NO. " + ++i; TimeUnit.MILLISECONDS.sleep(100); senderChannel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); } senderChannel.close(); } }

result:

in consumer A (delivery tag is 1): NO. 1 in consumer B (delivery tag is 1): NO. 2 in consumer A (delivery tag is 2): NO. 3 in consumer B (delivery tag is 2): NO. 4 in consumer A (delivery tag is 3): NO. 5 in consumer B (delivery tag is 3): NO. 6 in consumer A (delivery tag is 4): NO. 7 in consumer B (delivery tag is 4): NO. 8 in consumer A (delivery tag is 5): NO. 9 in consumer B (delivery tag is 5): NO. 10

可見,兩個通道的 delivery tag 分別從 1 遞增到 5。(如果修改程式碼,將兩個 Consumer 共享同一個通道,則 delivery tag 是從 1 遞增到 10,參考 gordon.study.rabbitmq.ack.TestAckInOneChannel.java)

basicAck 方法的第二個引數 multiple 取值為 false 時,表示通知 RabbitMQ 當前訊息被確認;如果為 true,則額外將比第一個引數指定的 delivery tag 小的訊息一併確認。(批量確認針對的是整個通道,參考gordon.study.rabbitmq.ack.TestBatchAckInOneChannel.java。)

對同一訊息的重複確認,或者對不存在的訊息的確認,會產生 IO 異常,導致通道關閉。

B. 忘了確認會怎樣

如果我們註釋掉22行,讓 consumerChannel1 不再確認訊息,世界會怎樣?

Unacked messages

只要程式還在執行,這3條訊息就一直是 Unacked 狀態,無法被 RabbitMQ 重新投遞。更厲害的是,RabbitMQ 訊息消費並沒有超時機制,也就是說,程式不重啟,訊息就永遠是 Unacked 狀態。處理運維事件時不要忘了這些 Unacked 狀態的訊息。

當程式關閉時(實際只要 Consumer 關閉就行),這3條訊息會恢復為 Ready 狀態。  

C. 取消確認

當消費訊息出現異常時,我們需要取消確認,這時我們可以使用 Channel 的 basicReject 方法。

void basicReject(long deliveryTag, boolean requeue) throws IOException;

第一個引數指定 delivery tag,第二個引數說明如何處理這個失敗訊息。requeue 值為 true 表示該訊息重新放回佇列頭,值為 false 表示放棄這條訊息。

一般來說,如果是系統無法處理的異常,我們一般是將 requeue 設為 false,例如訊息格式錯誤,再處理多少次也是異常。呼叫第三方介面超時這類異常 requeue 應該設為 true。

從 basicReject 方法引數可見,取消確認不支援批量操作(類似於 basicAck 的 multiple 引數)。所以,RabbitMQ 增加了 basicNack 方法以提供批量取消能力。參考 https://www.rabbitmq.com/nack.html

PS:Reject 的訊息重新推送來時,delivery tag 就是新的值了。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有